# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import argparse
import cluster as CLUSTER
import database
import dateutil.parser
import utils
import os
import os.path
import prettytable
import shutil
import sys
import time

LOG = utils.get_logger()


def wait_service(need_alive, wait_timeout, cluster, fe_ids, be_ids):
    if wait_timeout == 0:
        return
    if wait_timeout == -1:
        wait_timeout = 1000000000
    expire_ts = time.time() + wait_timeout
    while True:
        db_mgr = database.get_db_mgr(cluster.name,
                                     cluster.get_all_node_net_infos(), False)
        failed_frontends = []
        for id in fe_ids:
            fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
            fe_state = db_mgr.get_fe(id)
            fe_alive = fe_state and fe_state.alive
            if fe_alive and need_alive:
                # if need alive, check port available,
                # if need dead, don't check port available because it take some time for the disconnect socket
                fe_alive = utils.is_socket_avail(
                    fe.get_ip(), fe.meta["ports"]["query_port"])
            if fe_alive != need_alive:
                failed_frontends.append(id)
        failed_backends = []
        for id in be_ids:
            be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
            be_state = db_mgr.get_be(id)
            be_alive = be_state and be_state.alive
            if be_alive and need_alive:
                be_alive = utils.is_socket_avail(
                    be.get_ip(), be.meta["ports"]["webserver_port"])
            if be_alive != need_alive:
                failed_backends.append(id)
        if not failed_frontends and not failed_backends:
            break
        if time.time() >= expire_ts:
            err = ""
            failed_status = "dead" if need_alive else "alive"
            if failed_frontends:
                err += failed_status + " fe: " + str(failed_frontends) + ". "
            if failed_backends:
                err += failed_status + " be: " + str(failed_backends) + ". "
            raise Exception(err)
        time.sleep(1)


# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster,
                          fe_ids,
                          be_ids,
                          ms_ids,
                          recycle_ids,
                          fdb_ids,
                          ignore_not_exists=False):
    if fe_ids is None and be_ids is None and ms_ids is None and recycle_ids is None and fdb_ids is None:
        return True, None, cluster.get_all_nodes_num()

    def get_ids_related_nodes_with_type(node_type, ids):
        if ids is None:
            return []
        if not ids:
            return cluster.get_all_nodes(node_type)
        else:
            nodes = []
            for id in ids:
                try:
                    nodes.append(cluster.get_node(node_type, id))
                except Exception as e:
                    if ignore_not_exists:
                        LOG.warning(
                            utils.render_yellow(
                                "Not found {} with id {}".format(
                                    node_type, id)))
                    else:
                        raise e
            return nodes

    type_ids = [
        (CLUSTER.Node.TYPE_FE, fe_ids),
        (CLUSTER.Node.TYPE_BE, be_ids),
        (CLUSTER.Node.TYPE_MS, ms_ids),
        (CLUSTER.Node.TYPE_RECYCLE, recycle_ids),
        (CLUSTER.Node.TYPE_FDB, fdb_ids),
    ]

    nodes = []
    for node_type, ids in type_ids:
        nodes.extend(get_ids_related_nodes_with_type(node_type, ids))

    return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)


class Command(object):

    def __init__(self, name):
        self.name = name

    def print_use_time(self):
        return True

    def add_parser(self, args_parsers):
        raise Exception("No implemented")

    def run(self, args):
        raise Exception("No implemented")

    def _add_parser_common_args(self, parser):
        parser.add_argument("-v",
                            "--verbose",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="verbose logging.")
        parser.add_argument("--output-json",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="output as json, and don't print log.")

    def _add_parser_ids_args(self, parser):
        group = parser.add_argument_group("for existing nodes",
                                          "apply to the existing nodes.")
        group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \
                "if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.")
        group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \
                "if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.")
        group.add_argument(
            "--ms-id",
            nargs="*",
            type=int,
            help=
            "Specify up ms ids, support multiple ids. Only use in cloud cluster."
        )
        group.add_argument(
            "--recycle-id",
            nargs="*",
            type=int,
            help=
            "Specify up recycle ids, support multiple ids. Only use in cloud cluster."
        )
        group.add_argument(
            "--fdb-id",
            nargs="*",
            type=int,
            help=
            "Specify up fdb ids, support multiple ids. Only use in cloud cluster."
        )

    def _get_parser_bool_action(self, is_store_true):
        if self._support_boolean_action():
            return argparse.BooleanOptionalAction
        else:
            return "store_true" if is_store_true else "store_false"

    def _support_boolean_action(self):
        return sys.version_info.major == 3 and sys.version_info.minor >= 9

    def _print_table(self, header, datas):
        if utils.is_log_stdout():
            table = prettytable.PrettyTable(
                [utils.render_green(field) for field in header])
            for row in datas:
                table.add_row(row)
            print(table)
            return ""
        else:
            datas.insert(0, header)
            return datas


class SimpleCommand(Command):

    def __init__(self, command, help, options=[]):
        super().__init__(command)
        self.command = command
        self.help = help
        self.options = options

    def add_parser(self, args_parsers):
        help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
                "then apply to all containers."
        parser = args_parsers.add_parser(self.command, help=help)
        parser.add_argument("NAME", help="Specify cluster name.")
        self._add_parser_ids_args(parser)
        self._add_parser_common_args(parser)
        return parser

    def run(self, args):
        cluster = CLUSTER.Cluster.load(args.NAME)
        for_all, related_nodes, related_node_num = get_ids_related_nodes(
            cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
            args.fdb_id)
        utils.exec_docker_compose_command(cluster.get_compose_file(),
                                          self.command,
                                          options=self.options,
                                          nodes=related_nodes)
        show_cmd = self.command[0].upper() + self.command[1:]

        if for_all:
            related_nodes = cluster.get_all_nodes()

        LOG.info(
            utils.render_green("{} succ, total related node num {}".format(
                show_cmd, related_node_num)))

        return cluster, related_nodes


class StartBaseCommand(SimpleCommand):

    def add_parser(self, args_parsers):
        parser = super().add_parser(args_parsers)
        parser.add_argument(
            "--wait-timeout",
            type=int,
            default=0,
            help=
            "Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
            "> 0 max wait seconds, -1 wait unlimited."
        )
        return parser

    def run(self, args):
        cluster, related_nodes = super().run(args)
        fe_ids = [node.id for node in related_nodes if node.is_fe()]
        be_ids = [node.id for node in related_nodes if node.is_be()]
        if not cluster.is_host_network():
            wait_service(True, args.wait_timeout, cluster, fe_ids, be_ids)
        return cluster, related_nodes


class StartCommand(StartBaseCommand):

    def __init__(self, command):
        super().__init__(command, "Start the doris containers. "),


class RestartCommand(StartBaseCommand):

    def __init__(self, command):
        super().__init__(command, "Restart the doris containers. ",
                         ["-t", "1"]),


class StopCommand(SimpleCommand):

    def __init__(self, command):
        super().__init__(command, "Stop the doris containers. ", ["-t", "1"]),

    def add_parser(self, args_parsers):
        parser = super().add_parser(args_parsers)
        parser.add_argument(
            "--wait-timeout",
            type=int,
            default=0,
            help=
            "Specify wait seconds for fe/be close for service: 0 not wait (default), "\
            "> 0 max wait seconds, -1 wait unlimited."
        )
        return parser

    def run(self, args):
        cluster, related_nodes = super().run(args)
        fe_ids = [node.id for node in related_nodes if node.is_fe()]
        be_ids = [node.id for node in related_nodes if node.is_be()]
        if not cluster.is_host_network():
            wait_service(False, args.wait_timeout, cluster, fe_ids, be_ids)
        return cluster, related_nodes


class UpCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\
                "or add new containers. " \
                "If none of --add-fe-num, --add-be-num, --add-ms-num, --add-recycle-num, "\
                "--fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, " \
                "then apply to all containers.")
        parser.add_argument("NAME", default="", help="Specific cluster name.")
        parser.add_argument("IMAGE",
                            default="",
                            nargs="?",
                            help="Specify docker image.")

        self._add_parser_common_args(parser)
        parser.add_argument(
            "--cloud",
            default=False,
            action=self._get_parser_bool_action(True),
            help=
            "Create cloud cluster, default is false. Only use when creating new cluster."
        )
        parser.add_argument(
            "--root",
            default=False,
            action=self._get_parser_bool_action(True),
            help=
            "Run cluster as root user, default is false, it will run as host user."
        )

        parser.add_argument(
            "--wait-timeout",
            type=int,
            default=0,
            help=
            "Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
            "> 0 max wait seconds, -1 wait unlimited."
        )

        group1 = parser.add_argument_group("add new nodes",
                                           "add cluster nodes.")
        group1.add_argument(
            "--add-fe-num",
            type=int,
            help=
            "Specify add fe num, default: 3 for a new cluster, 0 for a existing cluster."
        )
        group1.add_argument(
            "--add-be-num",
            type=int,
            help=
            "Specify add be num, default: 3 for a new cluster, 0 for a existing cluster."
        )
        group1.add_argument(
            "--add-ms-num",
            type=int,
            help=
            "Specify add ms num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
        )
        group1.add_argument(
            "--add-recycle-num",
            type=int,
            help=
            "Specify add recycle num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
        )
        group1.add_argument("--fe-config",
                            nargs="*",
                            type=str,
                            help="Specify fe configs for fe.conf. "\
                                "Example: --fe-config \"enable_debug_points = true\" \"sys_log_level = ERROR\".")
        group1.add_argument("--be-config",
                            nargs="*",
                            type=str,
                            help="Specify be configs for be.conf. "\
                                    "Example: --be-config \"enable_debug_points = true\" \"enable_auth = true\".")
        group1.add_argument("--ms-config",
                            nargs="*",
                            type=str,
                            help="Specify ms configs for doris_cloud.conf. "\
                                    "Example: --ms-config \"log_level = warn\".")
        group1.add_argument("--recycle-config",
                            nargs="*",
                            type=str,
                            help="Specify recycle configs for doris_cloud.conf. "\
                                    "Example: --recycle-config \"log_level = warn\".")

        group1.add_argument(
            "--fe-follower",
            default=False,
            action=self._get_parser_bool_action(True),
            help=
            "The new added fe is follower but not observer. Only support in cloud mode."
        )
        group1.add_argument("--be-disks",
                            nargs="*",
                            default=["HDD=1"],
                            type=str,
                            help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\
                                    "disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\
                                  "Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\
                                  "means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\
                                  "2 SSD with 100GB capactity limit")
        group1.add_argument(
            "--be-cluster",
            type=str,
            help=
            "be cluster name, if not specific, will use compute_cluster. Only use in cloud cluster."
        )

        self._add_parser_ids_args(parser)

        group2 = parser.add_mutually_exclusive_group()
        if self._support_boolean_action():
            group2.add_argument(
                "--start",
                default=True,
                action=self._get_parser_bool_action(False),
                help="Start containers, default is true. If specific --no-start, "\
                "will create or update config image only but not start containers.")
        else:
            group2.add_argument(
                "--no-start",
                dest='start',
                default=True,
                action=self._get_parser_bool_action(False),
                help=
                "Create or update config image only and don't start containers."
            )
        group2.add_argument("--force-recreate",
                           default=False,
                           action=self._get_parser_bool_action(True),
                           help="Recreate containers even if their configuration " \
                                "and image haven't changed. ")

        parser.add_argument(
            "--extra-hosts",
            nargs="*",
            type=str,
            help=
            "Add custom host-to-IP mappings (host:ip). For example: --extra-hosts myhost1:192.168.10.1 myhost2:192.168.10.2 . Only use when creating new cluster."
        )

        parser.add_argument(
            "--env",
            nargs="*",
            type=str,
            help=
            "Add environment variables. For example: --env KEY1=VALUE1 KEY2=VALUE2. Only use when creating new cluster."
        )

        parser.add_argument("--coverage-dir",
                            default="",
                            help="Set code coverage output directory")

        parser.add_argument("--sql-mode-node-mgr",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="Manager fe be via sql instead of http")

        parser.add_argument(
            "--remote-master-fe",
            type=str,
            help=
            "Specify remote master fe address with ip:query_port, and all the container use host network. " \
                "Only use when creating new cluster."
        )

        parser.add_argument(
            "--local-network-ip",
            type=str,
            help= "Specify local network ip, no need specify, will auto chose a proper ip. "\
                "Only use when creating new cluster and specify --remote-master-fe."
        )

        parser.add_argument(
            "--external-ms",
            type=str,
            help=
            "Use external meta service cluster (specify cluster name). " \
            "This cluster will not create its own MS/FDB/Recycler, but use the specified cluster's services. " \
            "The external cluster must be a cloud cluster with MS/FDB already running. " \
            "Example: --external-ms shared-meta. Only use when creating new cloud cluster."
        )

        parser.add_argument(
            "--instance-id",
            type=str,
            help=
            "Specify instance ID for cloud mode. If not specified, will auto-generate 'default_instance_id'. " \
            "When using external MS with multiple clusters, each cluster should have a unique instance ID. " \
            "Example: --instance-id prod_instance_1"
        )

        parser.add_argument(
            "--cluster-snapshot",
            type=str,
            help=
            "Cluster snapshot JSON content for FE-1 first startup in cloud mode only. " \
            "The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh " \
            "with --cluster_snapshot parameter. Only effective on first startup. " \
            "Example: --cluster-snapshot '{\"instance_id\":\"instance_id_xxx\"}'"
        )

        if self._support_boolean_action():
            parser.add_argument(
                "--be-metaservice-endpoint",
                default=True,
                action=self._get_parser_bool_action(False),
                help=
                "Do not set BE meta service endpoint in conf. Default is False."
            )
        else:
            parser.add_argument(
                "--no-be-metaservice-endpoint",
                dest='be_metaservice_endpoint',
                default=True,
                action=self._get_parser_bool_action(False),
                help=
                "Do not set BE meta service endpoint in conf. Default is False."
            )

        # if default==False, use this style to parser, like --be-cluster-id
        parser.add_argument(
            "--be-cluster-id",
            default=False,
            action=self._get_parser_bool_action(True),
            help="Do not set BE cluster ID in conf. Default is False.")

        parser.add_argument(
            "--fdb-version",
            type=str,
            default="7.1.26",
            help="fdb image version. Only use in cloud cluster.")

        parser.add_argument(
            "--tde-ak",
            type=str,
            default="",
            help="tde ak")

        parser.add_argument(
            "--tde-sk",
            type=str,
            default="",
            help="tde sk")

        # if default==True, use this style to parser, like --detach
        if self._support_boolean_action():
            parser.add_argument(
                "--detach",
                default=True,
                action=self._get_parser_bool_action(False),
                help="Detached mode: Run containers in the background. If specific --no-detach, "\
                "will run containers in frontend. ")
        else:
            parser.add_argument("--no-detach",
                                dest='detach',
                                default=True,
                                action=self._get_parser_bool_action(False),
                                help="Run containers in frontend. ")

        if self._support_boolean_action():
            parser.add_argument(
                "--reg-be",
                default=True,
                action=self._get_parser_bool_action(False),
                help="Register be to meta server in cloud mode, use for multi clusters test. If specific --no-reg-be, "\
                "will not register be to meta server. ")
        else:
            parser.add_argument(
                "--no-reg-be",
                dest='reg_be',
                default=True,
                action=self._get_parser_bool_action(False),
                help=
                "Don't register be to meta server in cloud mode, use for multi clusters test"
            )

    def run(self, args):
        if not args.NAME:
            raise Exception("Need specific not empty cluster name")
        for_all = True
        add_fdb_num = 0
        is_new_cluster = False
        try:
            cluster = CLUSTER.Cluster.load(args.NAME)

            if not cluster.is_cloud:
                args.add_ms_num = None
                args.add_recycle_num = None
                args.ms_id = None
                args.recycle_id = None
                args.fdb_id = None

            if args.fe_id != None or args.be_id != None \
                or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
                or args.add_fe_num or args.add_be_num \
                or args.add_ms_num or args.add_recycle_num:
                for_all = False
        except:
            # a new cluster
            is_new_cluster = True
            if not args.IMAGE:
                raise Exception("New cluster must specific image") from None
            if args.fe_id != None:
                args.fe_id = None
                LOG.warning(
                    utils.render_yellow("Ignore --fe-id for new cluster"))
            if args.be_id != None:
                args.be_id = None
                LOG.warning(
                    utils.render_yellow("Ignore --be-id for new cluster"))

            args.fdb_id = None
            args.ms_id = None
            args.recycle_id = None

            if args.add_fe_num is None:
                args.add_fe_num = 0 if args.remote_master_fe else 3
            if args.add_be_num is None:
                args.add_be_num = 3

            cloud_store_config = {}
            if args.cloud:
                external_ms_cluster = getattr(args, 'external_ms', None)
                if external_ms_cluster:
                    # Using the MS nodes from external cluster, no need to add FDB/MS/Recycler
                    self._validate_external_ms_cluster(external_ms_cluster)
                    add_fdb_num = 0
                    args.add_ms_num = 0
                    args.add_recycle_num = 0
                    LOG.info(f"Using external MS cluster: {external_ms_cluster}")
                else:
                    add_fdb_num = 1
                    if not args.add_ms_num:
                        args.add_ms_num = 1
                    if not args.add_recycle_num:
                        args.add_recycle_num = 1
                    external_ms_cluster = None

                if not args.be_cluster:
                    args.be_cluster = "compute_cluster"
                cloud_store_config = self._get_cloud_store_config()
            else:
                args.add_ms_num = 0
                args.add_recycle_num = 0
                external_ms_cluster = None

            if args.remote_master_fe:
                if not args.local_network_ip:
                    args.local_network_ip = utils.get_local_ip()
                parts = args.remote_master_fe.split(":")
                if len(parts) != 2:
                    raise Exception(
                        f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'"
                    )
                if not parts[0]:
                    args.remote_master_fe = args.local_network_ip + ":" + parts[
                        1]
                if args.cloud:
                    args.sql_mode_node_mgr = True

            instance_id = getattr(args, 'instance_id', None)
            cluster_snapshot = getattr(args, 'cluster_snapshot', '')

            cluster = CLUSTER.Cluster.new(
                args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
                args.be_config, args.ms_config, args.recycle_config,
                args.remote_master_fe, args.local_network_ip, args.fe_follower,
                args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts, args.env,
                args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
                args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk,
                external_ms_cluster, instance_id, cluster_snapshot)
            LOG.info("Create new cluster {} succ, cluster path is {}".format(
                args.NAME, cluster.get_path()))

        if args.be_cluster and cluster.is_cloud:
            cluster.be_cluster = args.be_cluster

        if cluster.is_cloud:
            cluster.fe_follower = args.fe_follower

        _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
                                                    args.be_id, args.ms_id,
                                                    args.recycle_id,
                                                    args.fdb_id)
        add_fe_ids = []
        add_be_ids = []
        add_ms_ids = []
        add_recycle_ids = []
        add_fdb_ids = []

        add_type_nums = [
            (CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
            (CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
            (CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
            (CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
            (CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
        ]

        if not related_nodes:
            related_nodes = []

        def do_add_node(node_type, add_num, add_ids):
            if not add_num:
                return
            for i in range(add_num):
                node = cluster.add(node_type)
                related_nodes.append(node)
                add_ids.append(node.id)

        for node_type, add_num, add_ids in add_type_nums:
            do_add_node(node_type, add_num, add_ids)

        if args.IMAGE:
            for node in related_nodes:
                node.set_image(args.IMAGE)
            if for_all:
                cluster.set_image(args.IMAGE)

        for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
            node.set_image("foundationdb/foundationdb:{}".format(
                args.fdb_version))

        cluster.save()

        options = []
        if not args.start:
            options.append("--no-start")
        else:
            options += ["--remove-orphans"]
            if args.detach:
                options.append("-d")
            if args.force_recreate:
                options.append("--force-recreate")

        related_node_num = len(related_nodes)
        if for_all:
            related_node_num = cluster.get_all_nodes_num()
            related_nodes = None

        output_real_time = args.start and not args.detach
        utils.exec_docker_compose_command(cluster.get_compose_file(),
                                          "up",
                                          options,
                                          related_nodes,
                                          output_real_time=output_real_time)

        if not args.start:
            LOG.info(
                utils.render_green(
                    "Not up cluster cause specific --no-start, related node num {}"
                    .format(related_node_num)))
        else:
            LOG.info("Using SQL mode for node management ? {}".format(
                cluster.sql_mode_node_mgr))

            if cluster.remote_master_fe:
                if is_new_cluster:
                    with open(CLUSTER.get_master_fe_addr_path(cluster.name),
                              "w") as f:
                        f.write(cluster.remote_master_fe)
                    if cluster.is_cloud:
                        cloud_config = "\n".join([
                            f"meta_service_endpoint = {cluster.get_meta_server_addr()}",
                            "deploy_mode = cloud",
                            f"cluster_id = {CLUSTER.CLUSTER_ID}",
                        ])
                        # write add conf to remote_master_fe_add.conf, remote fe can send ssh to get this content.
                        with open(
                                os.path.join(
                                    CLUSTER.get_status_path(cluster.name),
                                    "remote_master_fe_add.conf"), "w") as f:
                            f.write(cloud_config)
                        ans = input(
                            utils.render_red(
                                f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: "
                            ) + "\n\n" + f"{cloud_config}\n\nConfirm ?  y/n: ")
                        if ans != 'y':
                            LOG.info(
                                "Up cluster failed due to not confirm write the above config."
                            )
                            return

                        LOG.info("Waiting connect to remote FE...")
                        expire_ts = time.time() + 3600 * 5
                        parts = cluster.remote_master_fe.split(":")
                        fe_ip = parts[0]
                        fe_port = int(parts[1])
                        ready = False
                        while expire_ts > time.time():
                            if utils.is_socket_avail(fe_ip, fe_port):
                                ready = True
                                break
                        if not ready:
                            raise Exception(
                                "Cannot connect to remote master fe: " +
                                cluster.remote_master_fe)

                        LOG.info("After connect to remote FE...")
            else:
                # Wait for FE master to be elected
                LOG.info("Waiting for FE master to be elected...")
                expire_ts = time.time() + 30
                while expire_ts > time.time():
                    ready = False
                    db_mgr = database.get_db_mgr(
                        args.NAME, cluster.get_all_node_net_infos(), False)
                    for id in add_fe_ids:
                        fe_state = db_mgr.get_fe(id)
                        if fe_state is not None and fe_state.alive:
                            ready = True
                            break
                    if ready:
                        break
                    LOG.info("there is no fe ready")
                    time.sleep(1)
                LOG.info("after Waiting for FE master to be elected...")
            if cluster.is_cloud and cluster.sql_mode_node_mgr:
                db_mgr = database.get_db_mgr(args.NAME,
                                             cluster.get_all_node_net_infos(),
                                             False)
                master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
                    cluster.name, True)
                # Add FEs except master_fe
                for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
                    fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}"
                    fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}"
                    if fe_querypoint != master_fe_endpoint:
                        try:
                            db_mgr.add_fe(
                                fe_endpoint, "FOLLOWER"
                                if cluster.fe_follower else "OBSERVER")
                            LOG.info(f"Added FE {fe_endpoint} successfully.")
                        except Exception as e:
                            LOG.error(
                                f"Failed to add FE {fe_endpoint}: {str(e)}")

                # Add BEs
                for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
                    be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}"
                    try:
                        db_mgr.add_be(be_endpoint)
                        LOG.info(f"Added BE {be_endpoint} successfully.")
                    except Exception as e:
                        LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
                if is_new_cluster:
                    cloud_store_config = self._get_cloud_store_config()
                    db_mgr.create_default_storage_vault(cloud_store_config)

            if not cluster.is_host_network():
                wait_service(True, args.wait_timeout, cluster, add_fe_ids,
                             add_be_ids)
            LOG.info(
                utils.render_green(
                    "Up cluster {} succ, related node num {}".format(
                        args.NAME, related_node_num)))

        ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
        LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
        LOG.info(
            "Master fe query address: " +
            utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
            "\n")
        return {
            "fe": {
                "add_list": add_fe_ids,
            },
            "be": {
                "add_list": add_be_ids,
            },
            "ms": {
                "add_list": add_ms_ids,
            },
            "recycle": {
                "add_list": add_recycle_ids,
            },
            "fdb": {
                "add_list": add_fdb_ids,
            },
        }

    def _validate_external_ms_cluster(self, external_ms_cluster_name):
        # 1. Is the external cluster exist?
        try:
            external_cluster = CLUSTER.Cluster.load(external_ms_cluster_name)
        except Exception as e:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' not found. "
                f"Please create it first with: "
                f"python doris-compose.py up {external_ms_cluster_name} <image> --cloud --add-fe-num 0 --add-be-num 0"
            ) from e

        # 2. Is the external cluster a cloud cluster?
        if not external_cluster.is_cloud:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' is not a cloud cluster. "
                f"Only cloud clusters can be used as external MS."
            )

        # 3. Does the external cluster have MS and FDB nodes?
        ms_group = external_cluster.get_group(CLUSTER.Node.TYPE_MS)
        fdb_group = external_cluster.get_group(CLUSTER.Node.TYPE_FDB)

        if ms_group.get_node_num() == 0:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' has no MS nodes. "
                f"Please add MS nodes first."
            )

        if fdb_group.get_node_num() == 0:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' has no FDB nodes. "
                f"Please add FDB nodes first."
            )

        # 4. Are the MS and FDB containers running?
        containers = utils.get_doris_running_containers(external_ms_cluster_name)

        ms_running = False
        fdb_running = False
        for container_name in containers.keys():
            _, node_type, _ = utils.parse_service_name(container_name)
            if node_type == CLUSTER.Node.TYPE_MS:
                ms_running = True
            elif node_type == CLUSTER.Node.TYPE_FDB:
                fdb_running = True

        if not ms_running:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' MS node is not running. "
                f"Please start it with: python doris-compose.py start {external_ms_cluster_name}"
            )

        if not fdb_running:
            raise Exception(
                f"External MS cluster '{external_ms_cluster_name}' FDB node is not running. "
                f"Please start it with: python doris-compose.py start {external_ms_cluster_name}"
            )

        LOG.info(utils.render_green(
            f"✓ External MS cluster '{external_ms_cluster_name}' validation passed: "
            f"MS={external_cluster.get_meta_server_addr()}, "
            f"FDB={external_cluster.get_fdb_cluster()}"
        ))

    def _get_cloud_store_config(self):
        example_cfg_file = os.path.join(CLUSTER.LOCAL_RESOURCE_PATH,
                                        "cloud.ini.example")
        if not CLUSTER.CLOUD_CFG_FILE:
            raise Exception("Cloud cluster need S3 store, specific its config in a file.\n"     \
                "A example file is " + example_cfg_file + ".\n"   \
                "Then setting the env variable  `export DORIS_CLOUD_CFG_FILE=<cfg-file-path>`.")

        if not os.path.exists(CLUSTER.CLOUD_CFG_FILE):
            raise Exception("Cloud store config file '" +
                            CLUSTER.CLOUD_CFG_FILE + "' not exists.")

        config = {}
        with open(example_cfg_file, "r") as f:
            for line in f.readlines():
                if line.startswith('#'):
                    continue
                pos = line.find('=')
                if pos <= 0:
                    continue
                key = line[0:pos].strip()
                if key:
                    config[key] = ""

        with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
            for line in f.readlines():
                if line.startswith('#'):
                    continue
                pos = line.find('=')
                if pos <= 0:
                    continue
                key = line[0:pos].strip()
                if key and config.get(key, None) != None:
                    config[key] = line[line.find('=') + 1:].strip()

        for key, value in config.items():
            if not value:
                raise Exception(
                    "Should provide none empty property '{}' in file {}".
                    format(key, CLUSTER.CLOUD_CFG_FILE))
        return config


class DownCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser("down",
                                     help="Down doris containers, networks. "\
                                           "It will also remove node from DB. " \
                                           "If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
                                           "then apply to all containers.")
        parser.add_argument("NAME", help="Specify cluster name")
        self._add_parser_ids_args(parser)
        self._add_parser_common_args(parser)
        parser.add_argument(
            "--clean",
            default=False,
            action=self._get_parser_bool_action(True),
            help=
            "Clean container related files, include expose data, config and logs."
        )
        parser.add_argument(
            "--drop-force",
            default=None,
            action=self._get_parser_bool_action(True),
            help="Drop doris node force. For be, if specific --drop-force, "\
                    "it will send dropp to fe, otherwise send decommission to fe.")

    def run(self, args):
        cluster_name = args.NAME
        cluster = None
        stop_grace = False

        try:
            cluster = CLUSTER.Cluster.load(cluster_name)
            for_all, related_nodes, related_node_num = get_ids_related_nodes(
                cluster,
                args.fe_id,
                args.be_id,
                args.ms_id,
                args.recycle_id,
                args.fdb_id,
                ignore_not_exists=True)
            stop_grace = cluster.coverage_dir
        except Exception as e:
            for_all = not args.fe_id and not args.be_id and not args.ms_id and not args.recycle_id
            related_nodes = []
            related_node_num = 0
            if not for_all:
                raise e

        LOG.info("down cluster " + args.NAME + " for all " +
                 str(for_all).lower())

        if for_all:
            compose_file = CLUSTER.get_compose_file(cluster_name)
            if os.path.exists(compose_file):
                try:
                    options = ["-v", "--remove-orphans"]
                    if not stop_grace:
                        options.extend(["-t", "1"])
                    utils.exec_docker_compose_command(compose_file,
                                                      "down",
                                                      options=options)
                except Exception as e:
                    LOG.warning("down cluster has exception: " + str(e))
            try:
                utils.remove_docker_network(cluster_name)
            except Exception as e:
                LOG.warning("remove network has exception: " + str(e))
            if args.clean:
                cluster_path = CLUSTER.get_cluster_path(cluster_name)
                if os.path.exists(cluster_path):
                    utils.enable_dir_with_rw_perm(cluster_path)
                    shutil.rmtree(cluster_path)
                LOG.info(
                    utils.render_yellow(
                        "Clean cluster data cause has specific --clean"))
        else:
            db_mgr = database.get_db_mgr(cluster.name,
                                         cluster.get_all_node_net_infos())

            for node in related_nodes:
                if node.is_fe():
                    fe_endpoint = "{}:{}".format(
                        node.get_ip(), node.meta["ports"]["edit_log_port"])
                    db_mgr.drop_fe(fe_endpoint)
                elif node.is_be():
                    be_endpoint = "{}:{}".format(
                        node.get_ip(),
                        node.meta["ports"]["heartbeat_service_port"])
                    if args.drop_force:
                        db_mgr.drop_be(be_endpoint)
                    else:
                        db_mgr.decommission_be(be_endpoint)
                else:
                    raise Exception("Unknown node type: {}".format(
                        node.node_type()))

                #utils.exec_docker_compose_command(cluster.get_compose_file(),
                #                                  "stop",
                #                                  nodes=[node])
                utils.exec_docker_compose_command(cluster.get_compose_file(),
                                                  "rm", ["-s", "-v", "-f"],
                                                  nodes=[node])
                if args.clean:
                    utils.enable_dir_with_rw_perm(node.get_path())
                    shutil.rmtree(node.get_path())
                    register_file = "{}/{}-{}-register".format(
                        CLUSTER.get_status_path(cluster.name),
                        node.node_type(), node.id)
                    if os.path.exists(register_file):
                        os.remove(register_file)
                    LOG.info(
                        utils.render_yellow(
                            "Clean {} with id {} data cause has specific --clean"
                            .format(node.node_type(), node.id)))

                cluster.remove(node.node_type(), node.id)
                cluster.save()

        LOG.info(
            utils.render_green(
                "Down cluster {} succ, related node num {}".format(
                    cluster_name, related_node_num)))

        return "down cluster succ"


class ListNode(object):

    def __init__(self):
        self.node_type = ""
        self.id = 0
        self.backend_id = ""
        self.cluster_name = ""
        self.ip = ""
        self.status = ""
        self.container_id = ""
        self.image = ""
        self.created = ""
        self.alive = ""
        self.is_master = ""
        self.tablet_num = ""
        self.last_heartbeat = ""
        self.err_msg = ""
        self.query_port = ""
        self.http_port = ""
        self.heartbeat_port = ""
        self.edit_log_port = ""
        self.heartbeat_port = ""

    def info(self, detail):
        result = [
            self.cluster_name, "{}-{}".format(self.node_type, self.id),
            self.ip, self.status, self.container_id, self.image, self.created,
            self.alive, self.is_master, self.backend_id, self.tablet_num,
            self.last_heartbeat, self.err_msg
        ]
        if detail:
            node_path = CLUSTER.get_node_path(self.cluster_name,
                                              self.node_type, self.id)
            result += [
                self.query_port, self.http_port, node_path, self.edit_log_port,
                self.heartbeat_port
            ]
        return result

    def update_db_info(self, cluster, db_mgr):
        try:
            node = cluster.get_node(self.node_type, self.id, True)
        except:
            node = None
        ports = node.meta["ports"] if node else {}
        if self.node_type == CLUSTER.Node.TYPE_FE:
            fe = db_mgr.get_fe(self.id)
            if fe:
                self.alive = str(fe.alive).lower()
                self.is_master = str(fe.is_master).lower()
                self.last_heartbeat = fe.last_heartbeat
                self.err_msg = fe.err_msg
                self.query_port = fe.query_port
                self.http_port = fe.http_port
                self.edit_log_port = fe.edit_log_port
        elif self.node_type == CLUSTER.Node.TYPE_BE:
            self.backend_id = -1
            be = db_mgr.get_be(self.id)
            if be:
                self.alive = str(be.alive).lower()
                self.backend_id = be.backend_id
                self.tablet_num = be.tablet_num
                self.last_heartbeat = be.last_heartbeat
                self.err_msg = be.err_msg
                self.http_port = be.http_port
                self.heartbeat_port = be.heartbeat_service_port
        elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE:
            if ports:
                self.http_port = ports.get("brpc_listen_port", -1)
        if node and node.meta.get("is_remote", False):
            self.ip = node.get_ip()
            self.container_id = "<remote>"
            self.image = "<remote>"


class GenConfCommand(Command):

    def print_use_time(self):
        return False

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser(
            "config",
            help="Generate regression-conf-custom.groovy for regression test.")
        parser.add_argument("NAME", default="", help="Specific cluster name.")
        parser.add_argument("DORIS_ROOT_PATH", default="", help="Specify doris or selectdb root path, "\
                "i.e. the parent directory of regression-test.")
        parser.add_argument("--connect-follow-fe",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="Connect to follow fe.")
        parser.add_argument("-q",
                            "--quiet",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="write config quiet, no need confirm.")

        return parser

    def run(self, args):
        base_conf = '''
jdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true"
feSourceThriftAddress = "{fe_ip}:{rpc_port}"
feTargetThriftAddress = "{fe_ip}:{rpc_port}"
syncerAddress = "{fe_ip}:9190"
feHttpAddress = "{fe_ip}:{http_port}"
'''

        cloud_conf = '''
feCloudHttpAddress = "{fe_ip}:18030"
metaServiceHttpAddress = "{ms_endpoint}"
metaServiceToken = "greedisgood9999"
recycleServiceHttpAddress = "{recycle_endpoint}"
instanceId = "12345678"
multiClusterInstance = "12345678"
multiClusterBes = "{multi_cluster_bes}"
cloudUniqueId= "{fe_cloud_unique_id}"
'''
        cluster = CLUSTER.Cluster.load(args.NAME)
        master_fe_ip_ep = CLUSTER.get_master_fe_endpoint(args.NAME)
        if not master_fe_ip_ep:
            print("Not found cluster with name {} in directory {}".format(
                args.NAME, CLUSTER.LOCAL_DORIS_PATH))
            return

        db_mgr = database.get_db_mgr(args.NAME,
                                     cluster.get_all_node_net_infos(), False)
        fe_ip = ""
        rpc_port = 0
        query_port = 0
        http_port = 0
        for fe in db_mgr.get_all_fe().values():
            if not fe.alive:
                continue
            if fe.is_master == (not args.connect_follow_fe):
                fe_ip = fe.ip
                rpc_port = fe.rpc_port
                query_port = fe.query_port
                http_port = fe.http_port
                break
        if not fe_ip:
            role = "follower" if args.connect_follow_fe else "master"
            raise Exception(f"Not found an alive {role} fe")

        relative_custom_file_path = "regression-test/conf/regression-conf-custom.groovy"
        regression_conf_custom = os.path.join(args.DORIS_ROOT_PATH,
                                              relative_custom_file_path)
        if not args.quiet:
            ans = input(
                "\nwrite file {} ?  y/n: ".format(regression_conf_custom))
            if ans != 'y':
                print("\nNo write regression custom file.")
                return

        annotation_start = "//---------- Start auto generate by doris-compose.py---------"
        annotation_end = "//---------- End auto generate by doris-compose.py---------"

        old_contents = []
        if os.path.exists(regression_conf_custom):
            with open(regression_conf_custom, "r") as f:
                old_contents = f.readlines()
        with open(regression_conf_custom, "w") as f:
            # write auto gen config
            f.write(annotation_start)
            f.write(
                base_conf.format(fe_ip=fe_ip,
                                 rpc_port=rpc_port,
                                 query_port=query_port,
                                 http_port=http_port))
            if cluster.is_cloud:
                multi_cluster_bes = ",".join([
                    "{}:{}:{}:{}:{}".format(
                        be.get_ip(),
                        be.meta["ports"]["heartbeat_service_port"],
                        be.meta["ports"]["webserver_port"],
                        be.cloud_unique_id(), be.meta["ports"]["brpc_port"])
                    for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE)
                ])
                master_fe = cluster.get_remote_fe_node(
                ) if cluster.is_host_network() else cluster.get_node(
                    CLUSTER.Node.TYPE_FE, 1)
                f.write(
                    cloud_conf.format(
                        fe_ip=fe_ip,
                        ms_endpoint=cluster.get_meta_server_addr(),
                        recycle_endpoint=cluster.get_recycle_addr(),
                        multi_cluster_bes=multi_cluster_bes,
                        fe_cloud_unique_id=master_fe.cloud_unique_id()))
            f.write(annotation_end + "\n\n")

            # write not-auto gen config
            in_annotation = False
            annotation_end_line_idx = -100
            for line_idx, line in enumerate(old_contents):
                line = line.rstrip()
                if line == annotation_start:
                    in_annotation = True
                elif line == annotation_end:
                    in_annotation = False
                    annotation_end_line_idx = line_idx
                elif not in_annotation:
                    if line or line_idx != annotation_end_line_idx + 1:
                        f.write(line + "\n")

        print("\nWrite succ: " + regression_conf_custom)


class ListCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser(
            "ls", help="List running doris compose clusters.")
        parser.add_argument(
            "NAME",
            nargs="*",
            help=
            "Specify multiple clusters, if specific, show all their containers."
        )
        self._add_parser_common_args(parser)
        parser.add_argument("--detail",
                            default=False,
                            action=self._get_parser_bool_action(True),
                            help="Print more detail fields.")

    def _hint_cluster_bad(self, cluster_name):
        cluster_path = CLUSTER.get_cluster_path(cluster_name)
        if not os.path.exists(cluster_path):
            LOG.info(
                utils.render_yellow(
                    f"Not exits cluster directory in '{CLUSTER.LOCAL_DORIS_PATH}'"
                ))
        elif not os.path.exists(CLUSTER.Cluster._get_meta_file(cluster_name)):
            LOG.error(
                utils.render_red(
                    f"Not exits cluster meta file in '{cluster_path}'"))
        else:
            try:
                CLUSTER.Cluster.load(cluster_name)
            except:
                LOG.error(utils.render_red("meta file is bad or incompatible with current doris-compose.py. " \
                    "Run command `down --clean` to destroy it then recreate a new one"))

    def run(self, args):
        COMPOSE_MISSING = "(missing)"
        COMPOSE_BAD = "(bad)"
        COMPOSE_GOOD = ""

        SERVICE_DEAD = "dead"
        SERVICE_RUNNING = "running"

        class ComposeService(object):

            def __init__(self, name, ip, image):
                self.name = name
                self.ip = ip
                self.image = image

        def parse_cluster_compose_file(cluster_name):
            compose_file = CLUSTER.get_compose_file(cluster_name)
            try:
                cluster = CLUSTER.Cluster.load(cluster_name)
                ip_for_host_mode = cluster.local_network_ip if cluster.is_host_network(
                ) else ""
            except:
                ip_for_host_mode = ""
            if not os.path.exists(compose_file):
                return COMPOSE_MISSING, {}
            try:
                compose = utils.read_compose_file(compose_file)
                if not compose:
                    return COMPOSE_BAD, {}
                services = compose.get("services", {})
                if services is None:
                    return COMPOSE_BAD, {}
                return COMPOSE_GOOD, {
                    service: ComposeService(
                        service, ip_for_host_mode if ip_for_host_mode else
                        list(service_conf["networks"].values())[0]
                        ["ipv4_address"], service_conf["image"])
                    for service, service_conf in services.items()
                }
            except:
                return COMPOSE_BAD, {}

        clusters = {}
        search_names = []
        if args.NAME:
            search_names = args.NAME
        else:
            search_names = CLUSTER.get_all_cluster_names()

        for cluster_name in search_names:
            status, services = parse_cluster_compose_file(cluster_name)
            clusters[cluster_name] = {"status": status, "services": services}

        docker_clusters = utils.get_doris_containers(args.NAME)
        for cluster_name, containers in docker_clusters.items():
            cluster_info = clusters.get(cluster_name, None)
            if not cluster_info:
                cluster_info = {"status": COMPOSE_MISSING, "services": {}}
                clusters[cluster_name] = cluster_info
            for container in containers:
                #if container.status == "running" and cluster_info[
                #        "status"] == COMPOSE_GOOD and (
                #            container.name not in cluster_info["services"]):
                #    container.status = "orphans"
                cluster_info["services"][container.name] = container

        TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
        if not args.NAME:
            header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD",
                      "NETWORK MODE", "CONFIG FILES")
            rows = []
            for name in sorted(clusters.keys()):
                cluster_info = clusters[name]
                service_statuses = {}
                for _, container in cluster_info["services"].items():
                    status = SERVICE_DEAD if type(
                        container) == TYPE_COMPOSESERVICE else container.status
                    service_statuses[status] = service_statuses.get(status,
                                                                    0) + 1
                show_status = ",".join([
                    "{}({})".format(status, count)
                    for status, count in service_statuses.items()
                ])
                owner = utils.get_path_owner(CLUSTER.get_cluster_path(name))
                compose_file = CLUSTER.get_compose_file(name)

                is_cloud = ""
                network_mode = ""
                try:
                    cluster = CLUSTER.Cluster.load(name)
                    is_cloud = "true" if cluster.is_cloud else "false"
                    network_mode = "host" if cluster.is_host_network(
                    ) else "bridge"
                except:
                    pass

                rows.append(
                    (name, owner, show_status,
                     CLUSTER.get_master_fe_endpoint(name), is_cloud,
                     network_mode, "{}{}".format(compose_file,
                                                 cluster_info["status"])))
            return self._print_table(header, rows)

        header = [
            "CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
            "CREATED", "alive", "is_master", "backend_id", "tablet_num",
            "last_heartbeat", "err_msg"
        ]
        if args.detail:
            header += [
                "query_port",
                "http_port",
                "path",
                "edit_log_port",
                "heartbeat_port",
            ]

        rows = []
        for cluster_name in sorted(clusters.keys()):
            fe_ids = {}
            be_ids = {}
            services = clusters[cluster_name]["services"]
            cluster = None
            try:
                cluster = CLUSTER.Cluster.load(cluster_name)
            except:
                pass

            db_mgr = database.get_db_mgr(
                cluster_name,
                cluster.get_all_node_net_infos() if cluster else [], False)
            nodes = []
            for service_name, container in services.items():
                _, node_type, id = utils.parse_service_name(container.name)
                node = ListNode()
                node.cluster_name = cluster_name
                node.node_type = node_type
                node.id = id
                node.update_db_info(cluster, db_mgr)
                nodes.append(node)

                if node_type == CLUSTER.Node.TYPE_FE:
                    fe_ids[id] = True
                elif node_type == CLUSTER.Node.TYPE_BE:
                    be_ids[id] = True

                if type(container) == TYPE_COMPOSESERVICE:
                    node.ip = container.ip
                    node.image = container.image
                    node.status = SERVICE_DEAD
                else:
                    node.created = dateutil.parser.parse(
                        container.attrs.get("Created")).astimezone().strftime(
                            "%Y-%m-%d %H:%M:%S")
                    if cluster and cluster.is_host_network():
                        node.ip = cluster.local_network_ip
                    else:
                        network_name = utils.get_network_name(cluster.name)
                        node.ip = container.attrs["NetworkSettings"]["Networks"][network_name] \
                                ["IPAMConfig"]["IPv4Address"]
                    node.image = container.attrs["Config"]["Image"]
                    if not node.image:
                        node.image = ",".join(container.image.tags)
                    node.container_id = container.short_id
                    node.status = container.status
                    if node.container_id and \
                        node_type in (CLUSTER.Node.TYPE_FDB,
                                     CLUSTER.Node.TYPE_MS,
                                     CLUSTER.Node.TYPE_RECYCLE):
                        node.alive = "true"

            for id, fe in db_mgr.fe_states.items():
                if fe_ids.get(id, False):
                    continue
                node = ListNode()
                node.cluster_name = cluster_name
                node.node_type = CLUSTER.Node.TYPE_FE
                node.id = id
                node.status = SERVICE_RUNNING if fe.alive else SERVICE_DEAD
                node.update_db_info(cluster, db_mgr)
                nodes.append(node)
            for id, be in db_mgr.be_states.items():
                if be_ids.get(id, False):
                    continue
                node = ListNode()
                node.cluster_name = cluster_name
                node.node_type = CLUSTER.Node.TYPE_BE
                node.id = id
                node.status = SERVICE_RUNNING if be.alive else SERVICE_DEAD
                node.update_db_info(cluster, db_mgr)
                nodes.append(node)

            def get_node_seq(node):
                return CLUSTER.get_node_seq(node.node_type, node.id)

            for node in sorted(nodes, key=get_node_seq):
                rows.append(node.info(args.detail))

        ret = self._print_table(header, rows)
        if len(args.NAME) == 1 and len(rows) == 0:
            self._hint_cluster_bad(args.NAME[0])

        return ret


class InfoCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser(
            "info", help="Show info like cloud.ini, port, path, etc")
        self._add_parser_common_args(parser)

    def run(self, args):

        header = ["key", "value", "scope"]
        cloud_cfg_file_env = os.getenv("DORIS_CLOUD_CFG_FILE")
        cloud_cfg_file = cloud_cfg_file_env if cloud_cfg_file_env else "${LOCAL_DORIS_PATH}/cloud.ini"
        rows = [
            ("LOCAL_DORIS_PATH", CLUSTER.LOCAL_DORIS_PATH, "env variable"),
            ("DORIS_CLOUD_CFG_FILE", cloud_cfg_file, "env variable"),
            ("FE_QUERY_PORT", CLUSTER.FE_QUERY_PORT, "constant"),
            ("FE_HTTP_PORT", CLUSTER.FE_HTTP_PORT, "constant"),
            ("FE_EDITLOG_PORT", CLUSTER.FE_EDITLOG_PORT, "constant"),
            ("FE_JAVA_DBG_PORT", CLUSTER.FE_JAVA_DBG_PORT, "constant"),
            ("BE_HEARTBEAT_PORT", CLUSTER.BE_HEARTBEAT_PORT, "constant"),
            ("BE_WEBSVR_PORT", CLUSTER.BE_WEBSVR_PORT, "constant"),
            ("MS_PORT", CLUSTER.MS_PORT, "constant"),
            ("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"),
        ]

        if os.path.exists(CLUSTER.CLOUD_CFG_FILE):
            with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith("#"):
                        key, value = line.split("=", 1)
                        rows.append((key.strip(), value.strip(), "cloud.ini"))

        return self._print_table(header, rows)


class AddRWPermCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser(
            "add-rw-perm",
            help="Add read and write permissions to the cluster files")
        parser.add_argument("NAME", help="Specify cluster name.")
        self._add_parser_common_args(parser)

    def run(self, args):
        utils.enable_dir_with_rw_perm(CLUSTER.get_cluster_path(args.NAME))
        return ""


class RollbackCommand(Command):

    def add_parser(self, args_parsers):
        parser = args_parsers.add_parser(
            "rollback",
            help="Rollback cloud cluster to a snapshot. " \
                 "Stop ALL FE/BE, clean metadata/data, and restart with new cloud_unique_id and instance_id. " \
                 "Only available for cloud mode clusters."
        )
        parser.add_argument("NAME", help="Specify cluster name.")
        parser.add_argument(
            "--cluster-snapshot",
            type=str,
            required=True,
            help="Cluster snapshot JSON content for rollback. " \
                 "Example: '{\"instance_id\":\"instance_id_xxx\"}'"
        )
        parser.add_argument(
            "--instance-id",
            type=str,
            help="New instance ID for the cluster after rollback. " \
                 "If not specified, will generate a new one based on timestamp."
        )
        parser.add_argument(
            "--wait-timeout",
            type=int,
            default=0,
            help="Specify wait seconds for fe/be ready for service: 0 not wait (default), " \
                 "> 0 max wait seconds, -1 wait unlimited."
        )
        self._add_parser_common_args(parser)
        return parser

    def _update_config_cloud_unique_id(self, conf_path, new_cloud_unique_id):
        """Update cloud_unique_id in fe.conf or be.conf file.

        This method updates the cloud_unique_id value in the config file.
        If the line exists, it will be replaced; if not found, it will be added.
        """
        updated = False
        lines = []

        # Read existing config
        with open(conf_path, "r") as f:
            lines = f.readlines()

        # Try to update existing cloud_unique_id line
        for i, line in enumerate(lines):
            stripped = line.strip()
            # Match lines like: cloud_unique_id = xxx or cloud_unique_id=xxx
            if stripped.startswith("cloud_unique_id") and ("=" in stripped):
                lines[i] = f"cloud_unique_id = {new_cloud_unique_id}\n"
                updated = True
                break

        # If not found, add it to the doris-compose section
        if not updated:
            raise Exception("cloud_unique_id not found in config file: " + conf_path)

        # Write back to file
        with open(conf_path, "w") as f:
            f.writelines(lines)

    def run(self, args):
        cluster = CLUSTER.Cluster.load(args.NAME)

        # Validate: only cloud clusters support rollback
        if not cluster.is_cloud:
            raise Exception("Rollback is only supported for cloud clusters")

        # Rollback must include ALL FE/BE nodes
        fe_nodes = cluster.get_all_nodes(CLUSTER.Node.TYPE_FE)
        be_nodes = cluster.get_all_nodes(CLUSTER.Node.TYPE_BE)

        if not fe_nodes and not be_nodes:
            raise Exception("No FE or BE nodes to rollback")

        # Generate new instance_id and rollback timestamp
        rollback_ts = str(int(time.time()))
        new_instance_id = args.instance_id or f"instance_{cluster.name}_{rollback_ts}"

        LOG.info(f"Starting rollback with instance_id: {new_instance_id}, timestamp: {rollback_ts}")

        # Step 1: Stop FE/BE nodes
        LOG.info("Step 1/5: Stopping FE/BE nodes...")
        fe_ids = [node.id for node in fe_nodes]
        be_ids = [node.id for node in be_nodes]

        stop_nodes = fe_nodes + be_nodes
        utils.exec_docker_compose_command(
            cluster.get_compose_file(),
            "stop",
            options=["-t", "1"],
            nodes=stop_nodes
        )
        LOG.info(f"Stopped {len(fe_nodes)} FE and {len(be_nodes)} BE nodes")

        cluster.is_rollback = True  # Add ROLLBACK envs

        # Step 2: Clean metadata and data
        LOG.info("Step 2/5: Cleaning metadata and data directories...")

        for fe in fe_nodes:
            fe_meta_path = os.path.join(fe.get_path(), "doris-meta")
            if os.path.exists(fe_meta_path):
                utils.enable_dir_with_rw_perm(fe_meta_path)
                shutil.rmtree(fe_meta_path)
                os.makedirs(fe_meta_path, exist_ok=True)
                LOG.info(f"  Cleaned FE-{fe.id} doris-meta/")

        for be in be_nodes:
            be_storage_path = os.path.join(be.get_path(), "storage")
            if os.path.exists(be_storage_path):
                utils.enable_dir_with_rw_perm(be_storage_path)
                shutil.rmtree(be_storage_path)
                # Recreate storage directories based on disk configuration
                be.init_disk(cluster.be_disks)
                LOG.info(f"  Cleaned BE-{be.id} storage/")

        # Step 3: Update cluster configuration
        LOG.info("Step 3/5: Updating cluster configuration...")

        # Update instance_id
        old_instance_id = cluster.instance_id
        cluster.instance_id = new_instance_id
        LOG.info(f"  Updated instance_id: {old_instance_id} -> {new_instance_id}")

        # Update cluster_snapshot for FE-1
        cluster.cluster_snapshot = args.cluster_snapshot
        fe1 = cluster.get_node(CLUSTER.Node.TYPE_FE, 1)
        snapshot_file = os.path.join(fe1.get_path(), "conf", "cluster_snapshot.json")
        with open(snapshot_file, "w") as f:
            f.write(args.cluster_snapshot)
        LOG.info(f"  Written cluster_snapshot to {snapshot_file}")

        # Step 4: Update cloud_unique_id by setting rollback_timestamp in node meta
        LOG.info("Step 4/5: Generating new cloud_unique_id...")

        for fe in fe_nodes:
            fe.meta["rollback_timestamp"] = rollback_ts
            old_id = f"{cluster.name}_sql_server_{fe.id}"
            new_id = fe.cloud_unique_id()
            LOG.info(f"  FE-{fe.id}: {old_id} -> {new_id}")

            fe_conf_path = os.path.join(fe.get_path(), "conf", "fe.conf")
            if os.path.exists(fe_conf_path):
                self._update_config_cloud_unique_id(fe_conf_path, new_id)

        for be in be_nodes:
            be.meta["rollback_timestamp"] = rollback_ts
            old_id = f"{cluster.name}_compute_node_{be.id}"
            new_id = be.cloud_unique_id()
            LOG.info(f"  BE-{be.id}: {old_id} -> {new_id}")

            be_conf_path = os.path.join(be.get_path(), "conf", "be.conf")
            if os.path.exists(be_conf_path):
                self._update_config_cloud_unique_id(be_conf_path, new_id)

        # Save updated cluster configuration
        cluster.save()
        LOG.info("  Saved cluster configuration")

        # Step 5: Start FE/BE nodes
        LOG.info("Step 5/5: Starting FE/BE nodes with new configuration...")

        utils.exec_docker_compose_command(
            cluster.get_compose_file(),
            "up",
            options=["-d"],
            nodes=stop_nodes
        )

        # Wait for services to be ready
        if not cluster.is_host_network():
            wait_service(True, args.wait_timeout, cluster, fe_ids, be_ids)

        LOG.info("Rollback completed successfully.")

        return "Rollback completed successfully."


ALL_COMMANDS = [
    UpCommand("up"),
    DownCommand("down"),
    StartCommand("start"),
    StopCommand("stop"),
    RestartCommand("restart"),
    SimpleCommand("pause", "Pause the doris containers. "),
    SimpleCommand("unpause", "Unpause the doris containers. "),
    GenConfCommand("config"),
    InfoCommand("info"),
    ListCommand("ls"),
    AddRWPermCommand("add-rw-perm"),
    RollbackCommand("rollback"),
]
